Skip to content

[CAE-1088] feat: RESP3 notifications support #3418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 53 commits into
base: master
Choose a base branch
from

Conversation

ndyakov
Copy link
Member

@ndyakov ndyakov commented Jun 26, 2025

This PR adds RESP3 push notifications support, enabling clients to handle server-initiated messages beyond traditional request-response patterns. The handlers for the notifications can be registered and deregistered by the end user. Added a special protected flag when registering a handler, so internal important handlers cannot be deregistered ( hitless upgrades will probably use this ).

Please review thoroughly. This is going to be executed as part of each read from connection and it is important it is performant both when there is a push notification to be processed and when there isn't one.

notes:

  • still can refactor error handling a bit.
  • should look at better timeout for checking the push notifications.

@ndyakov ndyakov requested a review from Copilot June 26, 2025 17:44
Copilot

This comment was marked as outdated.

ndyakov added 13 commits June 27, 2025 01:36
- Add PushNotificationRegistry for managing notification handlers
- Add PushNotificationProcessor for processing RESP3 push notifications
- Add client methods for registering push notification handlers
- Add PubSub integration for handling generic push notifications
- Add comprehensive test suite with 100% coverage
- Add push notification demo example

This system allows handling any arbitrary RESP3 push notification
with registered handlers, not just specific notification types.
- Change PushNotificationRegistry to allow only one handler per command
- RegisterHandler methods now return error if handler already exists
- Update UnregisterHandler to remove handler by command only
- Update all client methods to return errors for duplicate registrations
- Update comprehensive test suite to verify single handler behavior
- Add specific test for duplicate handler error scenarios

This prevents handler conflicts and ensures predictable notification
routing with clear error handling for registration conflicts.
- Remove all global push notification handler functionality
- Simplify registry to support only single handler per notification type
- Enable push notifications by default for RESP3 connections
- Update comprehensive test suite to remove global handler tests
- Update demo to show multiple specific handlers instead of global handlers
- Always respect custom processors regardless of PushNotifications flag

Push notifications are now automatically enabled for RESP3 and each
notification type has a single dedicated handler for predictable behavior.
- Add PushNotificationProcessor field to pool.Conn for connection-level processing
- Modify connection pool Put() and isHealthyConn() to handle push notifications
- Process pending push notifications before discarding connections
- Pass push notification processor to connections during creation
- Update connection pool options to include push notification processor
- Add comprehensive test for connection health check integration

This prevents connections with buffered push notification data from being
incorrectly discarded by the connection health check, ensuring push
notifications are properly processed and connections are reused.
…d clients

- Remove unused Timestamp and Source fields from PushNotificationInfo
- Add pushProcessor to newConn function to ensure Conn instances have push notifications
- Add push notification methods to Conn type for consistency
- Ensure cloned clients and Conn instances preserve push notification functionality

This fixes issues where:
1. PushNotificationInfo had unused fields causing confusion
2. Conn instances created via client.Conn() lacked push notification support
3. All client types now consistently support push notifications
- Add 10 new unit tests covering all previously untested code paths
- Test connection pool integration with push notifications
- Test connection health check integration
- Test Conn type push notification methods
- Test cloned client push notification preservation
- Test PushNotificationInfo structure validation
- Test edge cases and error scenarios
- Test custom processor integration
- Test disabled push notification scenarios

Total coverage now includes:
- 20 existing push notification tests
- 10 new comprehensive coverage tests
- All new code paths from connection pool integration
- All Conn methods and cloning functionality
- Edge cases and error handling scenarios
- Remove RegisterPushNotificationHandlerFunc methods from all types
- Remove PushNotificationHandlerFunc type adapter
- Keep only RegisterPushNotificationHandler method for cleaner interface
- Remove unnecessary push notification constants (keep only Redis Cluster ones)
- Update all tests to use simplified interface with direct handler implementations

Benefits:
- Cleaner, simpler API with single registration method
- Reduced code complexity and maintenance burden
- Focus on essential Redis Cluster push notifications only
- Users implement PushNotificationHandler interface directly
- No functional changes, just interface simplification
- Add sync.RWMutex to PushNotificationProcessor struct
- Protect enabled field access with read/write locks in IsEnabled() and SetEnabled()
- Use thread-safe IsEnabled() method in ProcessPendingNotifications()
- Fix concurrent access to enabled field that was causing data races

This resolves the race condition between goroutines calling IsEnabled() and
SetEnabled() concurrently, ensuring thread-safe access to the enabled field.
…ationName

- Add protected flag to RegisterHandler methods across all types
- Protected handlers cannot be unregistered, UnregisterHandler returns error
- Rename 'command' parameter to 'pushNotificationName' for clarity
- Update PushNotificationInfo.Command field to Name field
- Add comprehensive test for protected handler functionality
- Update all existing tests to use new protected parameter (false by default)
- Improve error messages to use 'push notification' terminology

Benefits:
- Critical handlers can be protected from accidental unregistration
- Clearer naming reflects that these are notification names, not commands
- Better error handling with informative error messages
- Backward compatible (existing handlers work with protected=false)
- Add VoidPushNotificationProcessor that reads and discards push notifications
- Create PushNotificationProcessorInterface for consistent behavior
- Always provide a processor (real or void) instead of nil
- VoidPushNotificationProcessor properly cleans RESP3 push notifications from buffer
- Remove all nil checks throughout codebase for cleaner, safer code
- Update tests to expect VoidPushNotificationProcessor when disabled

Benefits:
- Eliminates nil pointer risks throughout the codebase
- Follows null object pattern for safer operation
- Properly handles RESP3 push notifications even when disabled
- Consistent interface regardless of push notification settings
- Cleaner code without defensive nil checks everywhere
…ethods

- Remove enabled field from PushNotificationProcessor struct
- Remove IsEnabled() and SetEnabled() methods from processor interface
- Remove enabled parameter from NewPushNotificationProcessor()
- Update all interfaces in pool package to remove IsEnabled requirement
- Simplify processor logic - if processor exists, it works
- VoidPushNotificationProcessor handles disabled case by discarding notifications
- Update all tests to use simplified interface without enable/disable logic

Benefits:
- Simpler, cleaner interface with less complexity
- No unnecessary state management for enabled/disabled
- VoidPushNotificationProcessor pattern handles disabled case elegantly
- Reduced cognitive overhead - processors just work when set
- Eliminates redundant enabled checks throughout codebase
- More predictable behavior - set processor = it works
- Add nil check in newConn to create VoidPushNotificationProcessor when needed
- Fix tests to use Protocol 2 for disabled push notification scenarios
- Prevent nil pointer dereference in transaction and connection contexts
- Ensure consistent behavior across all connection creation paths

The panic was occurring because newConn could create connections with nil
pushProcessor when options didn't have a processor set. Now we always
ensure a processor exists (real or void) to maintain the 'never nil' guarantee.
@ndyakov ndyakov force-pushed the ndyakov/CAE-1088-resp3-notification-handlers branch from 8bb0e59 to 8006fab Compare June 26, 2025 22:36
ndyakov added 3 commits June 27, 2025 01:47
- Add push processor initialization to NewSentinelClient to prevent nil pointer dereference
- Add GetPushNotificationProcessor and RegisterPushNotificationHandler methods to SentinelClient
- Use VoidPushNotificationProcessor for Sentinel (typically doesn't need push notifications)
- Ensure consistent behavior across all client types that inherit from baseClient

This fixes the panic that was occurring in Sentinel contexts where the pushProcessor
field was nil, causing segmentation violations when processing commands.
- Copy pushProcessor from parent client to transaction in newTx()
- Ensure transactions inherit push notification processor from parent client
- Prevent nil pointer dereference in transaction contexts (Watch, Unwatch, etc.)
- Maintain consistent push notification behavior across all Redis operations

This fixes the panic that was occurring in transaction examples where the
transaction's baseClient had a nil pushProcessor field, causing segmentation
violations during transaction operations like Watch and Unwatch.
- Add push processor initialization to NewFailoverClient to prevent nil pointer dereference
- Use VoidPushNotificationProcessor for failover clients (typically don't need push notifications)
- Ensure consistent behavior across all client creation paths including failover scenarios
- Complete the coverage of all client types that inherit from baseClient

This fixes the final nil pointer dereference that was occurring in failover client
contexts where the pushProcessor field was nil, causing segmentation violations
during Redis operations in sentinel-managed failover scenarios.
ndyakov added 4 commits June 27, 2025 13:59
…lation

- Add GetHandler() method to PushNotificationProcessorInterface for better encapsulation
- Add GetPushNotificationHandler() convenience method to Client and SentinelClient
- Remove HasHandlers() check from ProcessPendingNotifications to ensure notifications are always consumed
- Use PushNotificationProcessorInterface in internal pool package for proper abstraction
- Maintain GetRegistry() for backward compatibility and testing
- Update pubsub to use GetHandler() instead of GetRegistry() for cleaner code

Benefits:
- Better API encapsulation - no need to expose entire registry
- Cleaner interface - direct access to specific handlers
- Always consume push notifications from reader regardless of handler presence
- Proper abstraction in internal pool package
- Backward compatibility maintained
- Consistent behavior across all processor types
… FailoverClient

- Add PushNotifications field to FailoverOptions struct
- Update clientOptions() to pass PushNotifications field to Options
- Change SentinelClient and FailoverClient initialization to use same logic as regular Client
- Both clients now support real push notification processors when enabled
- Both clients use void processors only when explicitly disabled
- Consistent behavior across all client types (Client, SentinelClient, FailoverClient)

Benefits:
- SentinelClient and FailoverClient can now fully utilize push notifications
- Consistent API across all client types
- Real processors when push notifications are enabled
- Void processors only when explicitly disabled
- Equal push notification capabilities for all Redis client types
…better encapsulation

- Remove GetRegistry() method from PushNotificationProcessorInterface
- Enforce use of GetHandler() method for cleaner API design
- Add GetRegistryForTesting() method for test access only
- Update all tests to use new testing helper methods
- Maintain clean separation between public API and internal implementation

Benefits:
- Better encapsulation - no direct registry access from public interface
- Cleaner API - forces use of GetHandler() for specific handler access
- Consistent interface design across all processor types
- Internal registry access only available for testing purposes
- Prevents misuse of registry in production code
…anic

- Add nil check for proto.Reader parameter in both PushNotificationProcessor and VoidPushNotificationProcessor
- Prevent segmentation violation when ProcessPendingNotifications is called with nil reader
- Return early with nil error when reader is nil (graceful handling)
- Fix panic in TestProcessPendingNotificationsEdgeCases test

This addresses the runtime panic that occurred when rd.Buffered() was called on a nil reader,
ensuring robust error handling in edge cases where the reader might not be properly initialized.
@ndyakov ndyakov force-pushed the ndyakov/CAE-1088-resp3-notification-handlers branch from 4e35707 to 9a7a5c8 Compare June 27, 2025 12:05
ndyakov added 4 commits June 27, 2025 16:27
- Remove unnecessary handlerWrapper complexity from push notifications
- Use separate maps for handlers and protection status in registry
- Store handlers directly without indirection layer
- Maintain same instance identity for registered/retrieved handlers
- Preserve all protected handler functionality with cleaner implementation

Changes:
- internal/pushnotif/registry.go: Use separate handlers and protected maps
- push_notifications.go: Remove handlerWrapper, store handlers directly
- Maintain thread-safe operations with simplified code structure

Benefits:
- Reduced memory overhead (no wrapper objects)
- Direct handler storage without type conversion
- Cleaner, more maintainable code
- Same functionality with better performance
- Eliminated unnecessary complexity layer
- Preserved all existing behavior and safety guarantees
- TestClientWithoutPushNotifications: Now expects error instead of nil
- TestClientPushNotificationEdgeCases: Now expects error instead of nil
…tions

- Fix TestConnWithoutPushNotifications to expect errors instead of nil
- Update test to verify error messages contain helpful information
- Add strings import for error message validation
- Maintain consistency with improved developer experience approach

The test now correctly expects errors when trying to register handlers on
connections with disabled push notifications, providing immediate feedback
to developers about configuration issues rather than silent failures.

This aligns with the improved developer experience where VoidProcessor
returns descriptive errors instead of silently ignoring registrations.
@ndyakov ndyakov marked this pull request as ready for review June 27, 2025 14:49
ndyakov added 3 commits July 4, 2025 21:19
Remove internal/pushprocessor and pushnotif packages that contained duplicate
and unresolved types. All push notification functionality is now consolidated
in the root package with direct type resolution.

Removed Packages:
- internal/pushprocessor/ - contained duplicate Registry, Processor, VoidProcessor
- pushnotif/ - contained interface wrappers that are no longer needed

Benefits:
- Single source of truth for all push notification logic
- No duplicate implementations or unresolved type references
- Cleaner codebase with all functionality in root package
- Eliminated confusion between internal and public interfaces
- Simplified architecture with direct type usage

All functionality remains intact and tests pass. The root package now contains
the complete, self-contained push notification implementation with concrete
types and no external dependencies.
Split push notification implementation into focused, maintainable files for
better code organization and easier navigation. Each file now has a clear
responsibility and contains related functionality.

File Organization:

1. push_notifications.go (Main API):
   - Push notification constants (MOVING, MIGRATING, etc.)
   - PushNotificationHandler interface
   - PushNotificationProcessorInterface
   - Public API wrappers (PushNotificationRegistry, PushNotificationProcessor)
   - Main entry point for push notification functionality

2. push_notification_handler_context.go (Context):
   - PushNotificationHandlerContext interface
   - pushNotificationHandlerContext concrete implementation
   - NewPushNotificationHandlerContext constructor
   - All context-related functionality with concrete type getters

3. push_notification_processor.go (Core Logic):
   - Registry implementation for handler management
   - Processor implementation for notification processing
   - VoidProcessor implementation for RESP2 connections
   - Core processing logic and notification filtering

Benefits:
- Clear separation of concerns between files
- Easier to navigate and maintain codebase
- Focused files with single responsibilities
- Better code organization for large codebase
- Simplified debugging and testing

File Responsibilities:
- Main API: Public interfaces and constants
- Context: Handler context with concrete type access
- Processor: Core processing logic and registry management

All functionality remains intact with improved organization. Tests pass
and compilation succeeds with the new file structure.
@ndyakov ndyakov force-pushed the ndyakov/CAE-1088-resp3-notification-handlers branch from 493ec66 to 604c8e3 Compare July 5, 2025 02:04
@ndyakov ndyakov force-pushed the ndyakov/CAE-1088-resp3-notification-handlers branch from e96c873 to b23f43c Compare July 5, 2025 03:21
@ndyakov ndyakov force-pushed the ndyakov/CAE-1088-resp3-notification-handlers branch from 7b883d0 to 7a0f316 Compare July 5, 2025 10:09
@ndyakov ndyakov force-pushed the ndyakov/CAE-1088-resp3-notification-handlers branch from af78f0d to 225c0bf Compare July 5, 2025 10:34
@ndyakov ndyakov requested a review from Copilot July 5, 2025 10:51
Copilot

This comment was marked as outdated.

@ndyakov ndyakov force-pushed the ndyakov/CAE-1088-resp3-notification-handlers branch from 9daeb75 to be3a6c6 Compare July 16, 2025 15:27
@ndyakov ndyakov requested a review from Copilot July 16, 2025 15:27
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements RESP3 push notification support in the Redis client, enabling handling of server-initiated messages beyond traditional request-response patterns. The implementation includes user-configurable notification handlers with a special "protected" flag for internal handlers to prevent accidental deregistration.

  • Adds a complete push notification framework with processors, handlers, and registry management
  • Integrates push notification processing throughout the Redis client lifecycle (connections, pipelines, pub/sub)
  • Provides both active (RESP3) and void (RESP2) processors with performance optimizations

Reviewed Changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
push/*.go Core push notification framework with processors, handlers, registry, and error definitions
redis.go Main client integration with push notification processing at all read points
sentinel.go Failover and sentinel client support for push notifications
pubsub.go Pub/sub integration with push notification handling
options.go Configuration options for push notification processors
internal/proto/reader.go RESP3 protocol parsing for push notification name extraction
internal/pool/*.go Connection pool optimizations for push notification data detection
Comments suppressed due to low confidence (2)

redis.go:690

  • [nitpick] The method name 'pipelineReadCmds' is inconsistent with Go naming conventions. Should be 'pipelineReadCmds' or better yet 'readPipelineCommands' to follow verb-noun pattern.
		return c.pipelineReadCmds(ctx, cn, rd, cmds)

push/processor.go:74

  • [nitpick] The function name 'willHandleNotificationInClient' is unclear. Consider renaming to 'shouldSkipNotification' or 'isHandledByClient' to better express its purpose of determining whether to skip processing.
		if willHandleNotificationInClient(notificationName) {

// NOTE: almost no timeouts are set for this read, so it should not block
// longer than necessary, 50us should be plenty of time to read if there are any push notifications
// on the socket
return cn.WithReader(ctx, 50*time.Microsecond, func(rd *proto.Reader) error {
Copy link
Preview

Copilot AI Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded timeout of 50 microseconds should be configurable or extracted as a named constant. This critical performance parameter affects push notification processing and may need tuning for different environments.

Copilot uses AI. Check for mistakes.

// Read the push notification
_, err = rd.ReadReply()
if err != nil {
internal.Logger.Printf(context.Background(), "push: error reading push notification: %v", err)
Copy link
Preview

Copilot AI Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using context.Background() instead of the provided ctx parameter loses the original context's deadline, cancellation, and values. Should use the ctx parameter for proper context propagation.

Suggested change
internal.Logger.Printf(context.Background(), "push: error reading push notification: %v", err)
internal.Logger.Printf(ctx, "push: error reading push notification: %v", err)

Copilot uses AI. Check for mistakes.

@@ -90,6 +90,92 @@ func (r *Reader) PeekReplyType() (byte, error) {
return b[0], nil
}

func (r *Reader) PeekPushNotificationName() (string, error) {
Copy link
Preview

Copilot AI Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method performs complex string parsing on every call without caching. Consider caching the result if the method is called multiple times on the same data, or document that it should only be called once per notification.

Copilot uses AI. Check for mistakes.

push/errors.go Outdated
Comment on lines 93 to 95
return fmt.Sprintf("%s %s failed for '%s': %s (%v)", e.ProcessorType, e.Operation, e.PushNotificationName, e.Reason, e.Err)
}
return fmt.Sprintf("%s %s failed for '%s': %s", e.ProcessorType, e.Operation, e.PushNotificationName, e.Reason)
Copy link
Preview

Copilot AI Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The error message format includes the push notification name even when it might be empty or not relevant for certain operations. Consider making the notification name optional in the message format.

Suggested change
return fmt.Sprintf("%s %s failed for '%s': %s (%v)", e.ProcessorType, e.Operation, e.PushNotificationName, e.Reason, e.Err)
}
return fmt.Sprintf("%s %s failed for '%s': %s", e.ProcessorType, e.Operation, e.PushNotificationName, e.Reason)
if e.PushNotificationName != "" {
return fmt.Sprintf("%s %s failed for '%s': %s (%v)", e.ProcessorType, e.Operation, e.PushNotificationName, e.Reason, e.Err)
}
return fmt.Sprintf("%s %s failed: %s (%v)", e.ProcessorType, e.Operation, e.Reason, e.Err)
}
if e.PushNotificationName != "" {
return fmt.Sprintf("%s %s failed for '%s': %s", e.ProcessorType, e.Operation, e.PushNotificationName, e.Reason)
}
return fmt.Sprintf("%s %s failed: %s", e.ProcessorType, e.Operation, e.Reason)

Copilot uses AI. Check for mistakes.

@@ -456,6 +460,12 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error)
if isBadConn(err, false, c.opt.Addr) {
c.connPool.Remove(ctx, cn, err)
} else {
// process any pending push notifications before returning the connection to the pool
if err := c.processPushNotifications(ctx, cn); err != nil {
Copy link
Preview

Copilot AI Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Processing push notifications on every connection release could impact performance. Consider batching or using a background goroutine for non-critical notifications, especially since errors are only logged.

Copilot uses AI. Check for mistakes.

}

// peek 36 bytes at most, should be enough to read the push notification name
toPeek := 36
Copy link
Preview

Copilot AI Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 36 for peek buffer size should be defined as a named constant with documentation explaining why this specific size was chosen.

Copilot uses AI. Check for mistakes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant